Kafka(七) |
您所在的位置:网站首页 › kafka指定时间戳消费 命令行 › Kafka(七) |
本篇博客要点如下: 一.环境准备 二.Kafka2.1.1常用命令总结 启动Kafka停止Kafka查看kafka消息列表创建topic删除topic消息生产消息消费查看某个topic详情修改分区数三.可能会用到的其它命令 通过命令行的方式修改Kafka topic配置快速定位某个topic异常的分区消费端从任意指定的偏移量开始消费数据使用指定的消费者组进行消费四.参考资料 一.环境准备 1. 本博客所列举的所有命令,均针对于Kafka2.0.0及以上版本,并不适用于低版本 2. 本博客所有命令,均是在正确配置好KAFKA_HOM 以及在PATH下面添加Kafka bin目录 PATH=$PATH:$KAFKA_HOME/bin 的基础上执行的 否则,需要去到kafka安装目录的bin目录下,执行相关脚本 二.Kafka2.1.1常用命令总结 启动KafkaKafka的正常运行,离不开Zookeeper提供的协调服务,因此首先要启动Zookeeper zkServer.sh start
使用jps命令,查看启动情况: jps如下图所示,证明Kafka启动成功 使用jps命令查看,已经没有Kakfa相关进程 运行结果如下 : 执行得到如下提示 : #topic first被标记删除 #但如果server.properties中未设置delete.topic.enable=true,不会有任何影响 Topic first is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.我们将server.properties中设置delete.topic.enable=true, 再来执行这个命令,看看结果如何 之后就可以在弹出来的输入框中生产消息 执行情况如下图: 我们在生产端生产一条消息 test 从下图中我们可以得知,如果不指定参数,是继续之前的偏移量消费 运行结果如下图所示 运行结果如下: # 这里给了一个警告 : 如果为具有关键字的主题增加了分区,则分区逻辑或消息的顺序将受到影响。 # 如果我们的业务场景对消息的顺序有着严格的要求, 一定要谨慎添加分区! 建议将之前所有的消息全部消费完才执行添加分区操作 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!上面我们的分区数从1,添加到了4, 成功了 如果我们尝试减少分区数,会发生什么呢? # 首先给出了警告 # 然后报错,Kafka的分区数,只能增加,不能减少 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Error while executing topic command : The number of partitions for a topic can only be increased. Topic test currently has 4 partitions, 2 would not be an increase. [2020-02-24 16:52:20,845] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitionsfor a topic can only be increased. Topic test currently has 4 partitions, 2 would not be an increase. (kafka.admin.TopicCommand$)那么我们来思考一下,为什么Kafka的分区数不能减少? 假设Kafka的分区数可以减少.那么首先就有一个需要考虑的问题,比如删除掉的分区中的消息该作何处理? 如果随着分区一起消失则消息的可靠性得不到保障; 如果需要保留那么该如何保留呢? 如果直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于Spark、Flink这类需要消息时间戳(事件时间)的组件将会受到影响; 如果分散插入到现有的分区中,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何呢?从技术的角度来说,减少Kafka的分区数目应该不难,但是因为这个小小的功能所带来的一系列麻烦确实在是过于恐怖,索性不提供这样的功能出来 三.可能会用到的其它命令 通过命令行的方式修改Kafka topic配置我们可以通过 --config 参数来对正在create 或者 alter的主题进行配置的覆盖, 使创建的主题更能满足我们实际业务的需求 比如说: Kafka默认允许的最大消息大小是1M,但是对于视频或者图片信息来说, 我们认为这1M远远达不到实际需要,这个时候,就可以采用下面的命令: # 将topic test的允许最大消息大小设置为10M kafka-topics.sh --zookeeper linux02:2181 --alter --topic test --config max.message.bytes=10485760除了上述举例的参数,以下这些参数也是允许在创建或者更改的时候覆盖的 # configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. # replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.downconversion.enable message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable 快速定位某个topic异常的分区假设我们的某一个topic业务量很庞大,分区数目很多. 忽然有一天,Kafka的某个节点挂了,我们就可以通过下面的命令来找出 有哪些分区的leader是不可用的 # 通过unavailable-partitions参数定位出 topic test leader不可用的分区 kafka-topics.sh --zookeeper linux01:2181 --describe --topic test --unavailable-partitions因为我集群里所有的节点都是正常的,所以执行这个命令什么都没有显示: 消息生产详情: 运行情况如下图所示: Kafka官方网站 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |